#!/usr/bin/env python

import subprocess
import threading
import Queue as queue
import os
import sys
import signal


CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')


def send_query(query):
    cmd = list(CLICKHOUSE_CLIENT.split())
    cmd += ['--query', query]
    # print(cmd)
    return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout


def send_query_in_process_group(query):
    cmd = list(CLICKHOUSE_CLIENT.split())
    cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress']
    # print(cmd)
    return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)


def read_lines_and_push_to_queue(pipe, queue):
    try:
        for line in iter(pipe.readline, ''):
            line = line.strip()
            # print(line)
            sys.stdout.flush()
            queue.put(line)
    except KeyboardInterrupt:
        pass

    queue.put(None)


def test():
    send_query('DROP TABLE IF EXISTS test.lv').read()
    send_query('DROP TABLE IF EXISTS test.mt').read()
    send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
    send_query('CREATE LIVE VIEW test.lv WITH TIMEOUT AS SELECT sum(a) FROM test.mt').read()

    q = queue.Queue()
    p = send_query_in_process_group('WATCH test.lv')
    thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
    thread.start()

    line = q.get()
    # print(line)
    assert (line.endswith('0\t1'))
    assert ('Progress: 0.00 rows' in line)

    send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
    line = q.get()
    assert (line.endswith('6\t2'))
    assert ('Progress: 1.00 rows' in line)

    # send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
    # line = q.get()
    # print(line)
    # assert (line.endswith('6\t2'))
    # assert ('Progress: 1.00 rows' in line)

    # Send Ctrl+C to client.
    os.killpg(os.getpgid(p.pid), signal.SIGINT)
    # This insert shouldn't affect lv.
    send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
    line = q.get()
    # print(line)
    # assert (line is None)

    send_query('DROP TABLE if exists test.lv').read()
    send_query('DROP TABLE if exists test.lv').read()

    thread.join()

test()
